Design Refinements in MapReduce: Part II

Let's analyze more insights, debugging, and efficient error-handling refinements in the MapReduce design.

We can incorporate the following refinements to get insights into our system’s status and performance, along with error handling mechanisms and debugging facilities. All of these refinements are supplementary to the previously covered refinements and augment the overall efficiency of the design.

Status information#

Even with all the distribution and parallelization, the MapReduce job is a time-taking process. For example, the best Hadoop (an open source implementation of Google’s MapReduce library) performance to date for processing 102.5 TB data is 4,328 seconds (1.2 hours), achieved by Thomas Graves of Yahoo! Inc. He used the following configuration for this task: 2100 nodes (each node had: 2 2.3Ghz hex-core Xeon E5-2630, 64 GB memory, 12x3TB disks).

It’s beneficial for the users to access the status of their MapReduce jobs to get insights and make crucial decisions in case any modifications are required.

Status pages#

The manager houses an internal HTTP server and provides users access to a set of status pages. These status pages present the computation progress, such as the number of completed tasks, the number of in-progress tasks, input data size, intermediate data size, output data size, processing rates, etc.

These pages also contain information about the number of failed tasks, the workers they were running on, and which Map or Reduce tasks they were processing, along with links to the standard errors.

These status pages also provide users with links to the standard output files generated by each task.

Computation progress
Compu...
Failed tasks
Faile...
Output files
Outpu...
- Errors
- Errors
- IDs of workers having the errors
- IDs of worke...
- Link to output files
- Link to outp...
Status pages
Status pages
HTTP server
HTTP server
User interface
User interface
- Completed tasks
- Completed ta...
- In progress tasks
- In progress ta...
- Input data size in bytes
- Input data siz...
- Output data size in bytes
- Output data si...
- Intermediate data size in bytes
- Intermediate d...
Viewer does not support full SVG 1.1
Status page access via an HTTP server

Users can analyze this data to infer the following information:

  • How long the computation will take
  • Whether or not there more resources need to be added to the computation
  • The individual processing rates of workers

Counters#

The MapReduce library has an in-built facility to compute the counters of various events. It houses some counters by default, like the number of processed input and generated output key-value pairs.

In addition to these default counters, users can define a customized counter by creating a counter object in the user code and incrementing it appropriately in the Map or Reduce functions.

Examples

The user might need the number of processed words or the number of indexed German documents.

Process of accumulating the counter output#

Each worker houses a local counter object and periodically sends the value to the manager by piggybacking it on the ping response. The manager aggregates the individual counter values from successful Map and Reduce tasks and returns the accumulated value upon MapReduce job completion.

The manager accumulates the counter values from individual workers
The manager accumulates the counter values from individual workers

When aggregating the counter values, the manager avoids double counting by eliminating the effects of double execution, which might result from backup tasks or re-execution in case of failures.

Note: The users can also see the current counter value by accessing the manager’s status page.

Applying the counters#

This counter facility enables the sanity checking of the MapReduce job behavior. For example, a user may want to ensure that the number of output pairs of the map phase and the number of processed input pairs of the reduce phase is precisely equal or that the percentage of processed German documents lies within a reasonable range of the total number of processed documents.

The following refinement can help the system’s error handling capabilities.

Skipping bad records#

We might have to deal with faulty datasets, causing our program to deterministically crash on some records. These are referred to as bad records. Bugs like these halt the completion of MapReduce jobs.

We can attempt to fix the bug, but sometimes this is not feasible. The error might be caused by a third-party library that we cannot access. The bug might be in a third-party library, the source code of which we don’t have access to. The only feasible method in such cases is identifying and ignoring faulty records.

Note: While analyzing a large dataset, it might be acceptable to ignore a few records if doing so does not affect the outcome.

The MapReduce library provides an optional execution mode to detect records causing deterministic crashes and skips them to process the remaining records.

Process of skipping bad records#

Each worker has an installed signal handler to catch segmentation failures and bus errors. Before a record gets processed by a Map or Reduce function, the MapReduce library saves the argument in a global variable.

If a specific argument causes an error, the signal handler sends the last gasp UDP packet to the manager containing the sequence number. If the manager receives higher than threshold error signals against a particular record, it initiates the protocol for its elimination from future executions of the corresponding Map or Reduce tasks.

Created with Fabric.js 3.6.6
Successful execution of record 001 in the Map() function

1 of 4

Created with Fabric.js 3.6.6
A failure is caused by record 002 in Map() function

2 of 4

Created with Fabric.js 3.6.6
A failure is caused by record 001 in the Reduce() function

3 of 4

Created with Fabric.js 3.6.6
A failure is caused by record 002 in the Reduce function. Record 002 is eliminated as the count exceeds 1 (we set this low threshold for the sake of explanation)

4 of 4

The following refinement enables the user to perform program testing and debugging.

Local execution#

Since MapReduce is mainly a distributed system on several thousand machines centrally controlled by a manager, tracing and fixing the errors can be trickier. To overcome this challenge and facilitate users with debugging, profiling, and small-scale testing, we provide them with an alternative local execution that executes all the tasks on a single machine.

Advantages of local execution
Advantages of local execution

It gives full autonomy to the users so they can limit the Map tasks to a specific number. Users can execute their programs with a special debugging flag or testing utility to test their desired functionality. Once tested locally, users can utilize the same program on a full scale.

In this lesson, we’ve discussed our refinements to the design. In the next lesson, we’ll evaluate our system.

Design Refinements in MapReduce: Part I

MapReduce: Evaluation